package com.lyon.demo.protocol.netty.core;

import cn.hutool.core.lang.Pair;
import cn.hutool.core.util.SerializeUtil;
import cn.hutool.json.JSONUtil;
import com.lyon.demo.protocol.api.core.Command;
import com.lyon.demo.protocol.api.core.RemotingCallback;
import com.lyon.demo.protocol.api.core.RequestStatus;
import com.lyon.demo.protocol.api.core.command.ResponseFuture;
import com.lyon.demo.protocol.api.remoting.AbstractRemotingService;
import com.lyon.demo.protocol.api.remoting.AsyncRequestProcessor;
import com.lyon.demo.protocol.api.remoting.RequestProcessor;
import com.lyon.demo.protocol.netty.command.RemoteCommand;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
 * @author LeeYan9
 * @since 2022-05-25
 */
@SuppressWarnings({"rawtypes", "unchecked"})
@Slf4j
public abstract class AbstractNettyRemotingService<T extends Command> extends AbstractRemotingService<ChannelHandlerContext, T> {

    @Override
    protected void processRequest(ChannelHandlerContext ctx, T command) {
        int requestCode = command.handlerCode();
        Pair<RequestProcessor<ChannelHandlerContext, T>, ExecutorService> match = this.requestProcessorTable.get(requestCode);
        if (Objects.isNull(match)) {
            log.error("执行请求处理失败,未找到对应处理器 {}-{}", command.getRequestId(), command.handlerCode());
            T response = SerializeUtil.clone(command);
            response.getStatus(RequestStatus.UNKNOWN_HANDLER_CODE);
            ctx.writeAndFlush(response);
            return;
        }
        RequestProcessor<ChannelHandlerContext, T> processor = match.getKey();
        ExecutorService executorService = match.getValue();

        Runnable runnable = () -> {
            T response = null;
            try {
                doBeforeHook(command);
                RemotingCallback<T> callback = callbackResponse -> doAfterHook(command, callbackResponse);
                if (processor instanceof AsyncRequestProcessor) {
                    ((AsyncRequestProcessor) processor).asyncProcessRequest(ctx, command, callback);
                } else {
                    response = processor.processRequest(ctx, command);
                    callback.callback(response);
                }
            } catch (Exception e) {
                log.error("执行请求处理失败01", e);
                if (response == null) {
                    response = SerializeUtil.clone(command);
                    response.getStatus(RequestStatus.FAILURE);
                }
                ctx.writeAndFlush(response);
            }
        };
        try {
            if (executorService == null) {
                runnable.run();
            } else {
                executorService.execute(runnable);
            }
        } catch (Exception e) {
            log.error("执行请求处理失败03", e);
            T response = SerializeUtil.clone(command);
            response.getStatus(RequestStatus.FAILURE);
            ctx.writeAndFlush(response);
        }
    }

    @Override
    protected void processResponse(ChannelHandlerContext ctx, T command) {
        long requestId = command.getRequestId();
        log.debug("请求id[{}]-接收到响应请求", requestId);
        ResponseFuture responseFuture = responseTable.get(requestId);
        if (Objects.isNull(responseFuture)) {
            log.warn("请求id[{}]-找不到对应的响应异步返回器-[{}]", requestId, JSONUtil.toJsonStr(((RemoteCommand)command).parseData()));
            return;
        }
        executeCallback(responseFuture, command);
    }

    private void executeCallback(ResponseFuture responseFuture, T command) {
        try {
            Runnable runnable = () -> {
                responseFuture.complete(command);
                RemotingCallback callback = responseFuture.getCallback();

                log.debug("请求id[{}]-准备执行回调-[{}]", responseFuture.getRequestId(), JSONUtil.toJsonStr(((RemoteCommand)command).parseData()));
                if (Objects.nonNull(callback)) {
                    callback.callback(responseFuture);
                }
            };
            ExecutorService callbackExecutor = getCallbackExecutor();
            if (callbackExecutor != null) {
                callbackExecutor.submit(runnable);
            } else {
                runnable.run();
            }
        } catch (Exception ex) {
            log.error("执行回调失败", ex);
        } finally {
            responseTable.remove(responseFuture.getRequestId());
        }
    }
}
